package drds.plus.executor.function.aggregate_function;

import drds.plus.executor.ExecuteContext;
import drds.plus.executor.function.ExtraFunction;
import drds.plus.executor.row_values.RowValues;
import drds.plus.executor.utils.Utils;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.function.FunctionType;
import drds.plus.sql_process.abstract_syntax_tree.expression.item.function.extra_function.IExtraFunction;
import drds.plus.sql_process.type.Type;

import java.util.List;


/**
 * <pre>
 * map/reduce模型
 * 1. sum会先map到所有相关的机器上
 * 2. reduce方法内做合并
 *
 * 比较特殊的是avg
 * 1. 首先它在map的时候，需要下面的节点统计count + sum.
 * 2. reduce则是进行avg计算的地方，进行count/sum处理
 *
 * 因为有map/reduce模型，所有带有函数计算的执行计划都被设定为： sort { where } 结构，也就是merge下面挂query的模型，
 * </pre>
 */
public abstract class AggregateFunction extends ExtraFunction implements IExtraFunction {

    protected Object result;

    public void setResult(Object result) {
        this.result = result;
    }

    public Object getResult() {
        return result;
    }

    public void clear() {
        result = null;
    }

    //
    public String getDataBaseFunction() {
        return function.getColumnName();
    }

    public FunctionType getFunctionType() {
        return FunctionType.aggregate_function;
    }

    /**
     * 外部执行器传递ResultSet中的row记录，进行function的map计算
     */
    public void map(ExecuteContext executeContext, RowValues rowData) throws RuntimeException {
        // 当前function需要的args 有些可能是函数，也有些是其他的一些数据
        List<Object> argList = getMapArgList(function);
        // 函数的input参数
        Object[] args = new Object[argList.size()];
        int index = 0;
        for (Object arg : argList) {
            args[index] = getArgValue(executeContext, arg, rowData);
            index++;
        }
        map(executeContext, args);
    }

    public abstract void map(ExecuteContext executeContext, Object[] args);

    /**
     * 获取Map函数的返回结果
     */
    public abstract Type getMapReturnType();
    //

    /**
     * 外部执行器传递ResultSet中的row记录，进行function的reduce计算
     */
    public void reduce(ExecuteContext executeContext, RowValues rowData) throws RuntimeException {

        // 函数的input参数
        List<Object> argList = this.getReduceArgList();
        Object[] args = new Object[argList.size()];
        // 目前认为所有scalar函数可下推
        for (int i = 0; i < argList.size(); i++) {
            String columnName = argList.get(i).toString();
            Object value = Utils.getValue(rowData, this.function.getTableName(), columnName, this.function.getAlias());
            args[i] = value;
        }
        reduce(executeContext, args);
    }

    public abstract void reduce(ExecuteContext executeContext, Object[] args);
}
